1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.junit.Assert.assertArrayEquals;
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.assertNotNull;
21  import static org.junit.Assert.assertTrue;
22  import static org.mockito.Matchers.any;
23  import static org.mockito.Matchers.anyInt;
24  import static org.mockito.Mockito.mock;
25  import static org.mockito.Mockito.never;
26  import static org.mockito.Mockito.verify;
27  
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.junit.Before;
41  import org.junit.Test;
42  import org.mockito.Matchers;
43  import org.mockito.MockitoAnnotations;
44  
45  import rx.Notification;
46  import rx.Observable;
47  import rx.Observable.OnSubscribe;
48  import rx.Observer;
49  import rx.Subscriber;
50  import rx.Subscription;
51  import rx.exceptions.TestException;
52  import rx.functions.Action0;
53  import rx.functions.Action1;
54  import rx.functions.Func1;
55  import rx.internal.util.UtilityFunctions;
56  import rx.observables.GroupedObservable;
57  import rx.observers.TestSubscriber;
58  import rx.schedulers.Schedulers;
59  
60  public class OperatorGroupByTest {
61  
62      final Func1<String, Integer> length = new Func1<String, Integer>() {
63          @Override
64          public Integer call(String s) {
65              return s.length();
66          }
67      };
68  
69      @Test
70      public void testGroupBy() {
71          Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
72          Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
73  
74          Map<Integer, Collection<String>> map = toMap(grouped);
75  
76          assertEquals(3, map.size());
77          assertArrayEquals(Arrays.asList("one", "two", "six").toArray(), map.get(3).toArray());
78          assertArrayEquals(Arrays.asList("four", "five").toArray(), map.get(4).toArray());
79          assertArrayEquals(Arrays.asList("three").toArray(), map.get(5).toArray());
80      }
81  
82      @Test
83      public void testGroupByWithElementSelector() {
84          Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
85          Observable<GroupedObservable<Integer, Integer>> grouped = source.lift(new OperatorGroupBy<String, Integer, Integer>(length, length));
86  
87          Map<Integer, Collection<Integer>> map = toMap(grouped);
88  
89          assertEquals(3, map.size());
90          assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
91          assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
92          assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
93      }
94  
95      @Test
96      public void testGroupByWithElementSelector2() {
97          Observable<String> source = Observable.just("one", "two", "three", "four", "five", "six");
98          Observable<GroupedObservable<Integer, Integer>> grouped = source.groupBy(length, length);
99  
100         Map<Integer, Collection<Integer>> map = toMap(grouped);
101 
102         assertEquals(3, map.size());
103         assertArrayEquals(Arrays.asList(3, 3, 3).toArray(), map.get(3).toArray());
104         assertArrayEquals(Arrays.asList(4, 4).toArray(), map.get(4).toArray());
105         assertArrayEquals(Arrays.asList(5).toArray(), map.get(5).toArray());
106     }
107 
108     @Test
109     public void testEmpty() {
110         Observable<String> source = Observable.empty();
111         Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
112 
113         Map<Integer, Collection<String>> map = toMap(grouped);
114 
115         assertTrue(map.isEmpty());
116     }
117 
118     @Test
119     public void testError() {
120         Observable<String> sourceStrings = Observable.just("one", "two", "three", "four", "five", "six");
121         Observable<String> errorSource = Observable.error(new RuntimeException("forced failure"));
122         Observable<String> source = Observable.concat(sourceStrings, errorSource);
123 
124         Observable<GroupedObservable<Integer, String>> grouped = source.lift(new OperatorGroupBy<String, Integer, String>(length));
125 
126         final AtomicInteger groupCounter = new AtomicInteger();
127         final AtomicInteger eventCounter = new AtomicInteger();
128         final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
129 
130         grouped.flatMap(new Func1<GroupedObservable<Integer, String>, Observable<String>>() {
131 
132             @Override
133             public Observable<String> call(final GroupedObservable<Integer, String> o) {
134                 groupCounter.incrementAndGet();
135                 return o.map(new Func1<String, String>() {
136 
137                     @Override
138                     public String call(String v) {
139                         return "Event => key: " + o.getKey() + " value: " + v;
140                     }
141                 });
142             }
143         }).subscribe(new Subscriber<String>() {
144 
145             @Override
146             public void onCompleted() {
147 
148             }
149 
150             @Override
151             public void onError(Throwable e) {
152                 e.printStackTrace();
153                 error.set(e);
154             }
155 
156             @Override
157             public void onNext(String v) {
158                 eventCounter.incrementAndGet();
159                 System.out.println(v);
160 
161             }
162         });
163 
164         assertEquals(3, groupCounter.get());
165         assertEquals(6, eventCounter.get());
166         assertNotNull(error.get());
167     }
168 
169     private static <K, V> Map<K, Collection<V>> toMap(Observable<GroupedObservable<K, V>> observable) {
170 
171         final ConcurrentHashMap<K, Collection<V>> result = new ConcurrentHashMap<K, Collection<V>>();
172 
173         observable.toBlocking().forEach(new Action1<GroupedObservable<K, V>>() {
174 
175             @Override
176             public void call(final GroupedObservable<K, V> o) {
177                 result.put(o.getKey(), new ConcurrentLinkedQueue<V>());
178                 o.subscribe(new Action1<V>() {
179 
180                     @Override
181                     public void call(V v) {
182                         result.get(o.getKey()).add(v);
183                     }
184 
185                 });
186             }
187         });
188 
189         return result;
190     }
191 
192     /**
193      * Assert that only a single subscription to a stream occurs and that all events are received.
194      * 
195      * @throws Throwable
196      */
197     @Test
198     public void testGroupedEventStream() throws Throwable {
199 
200         final AtomicInteger eventCounter = new AtomicInteger();
201         final AtomicInteger subscribeCounter = new AtomicInteger();
202         final AtomicInteger groupCounter = new AtomicInteger();
203         final CountDownLatch latch = new CountDownLatch(1);
204         final int count = 100;
205         final int groupCount = 2;
206 
207         Observable<Event> es = Observable.create(new Observable.OnSubscribe<Event>() {
208 
209             @Override
210             public void call(final Subscriber<? super Event> observer) {
211                 System.out.println("*** Subscribing to EventStream ***");
212                 subscribeCounter.incrementAndGet();
213                 new Thread(new Runnable() {
214 
215                     @Override
216                     public void run() {
217                         for (int i = 0; i < count; i++) {
218                             Event e = new Event();
219                             e.source = i % groupCount;
220                             e.message = "Event-" + i;
221                             observer.onNext(e);
222                         }
223                         observer.onCompleted();
224                     }
225 
226                 }).start();
227             }
228 
229         });
230 
231         es.groupBy(new Func1<Event, Integer>() {
232 
233             @Override
234             public Integer call(Event e) {
235                 return e.source;
236             }
237         }).flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
238 
239             @Override
240             public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
241                 System.out.println("GroupedObservable Key: " + eventGroupedObservable.getKey());
242                 groupCounter.incrementAndGet();
243 
244                 return eventGroupedObservable.map(new Func1<Event, String>() {
245 
246                     @Override
247                     public String call(Event event) {
248                         return "Source: " + event.source + "  Message: " + event.message;
249                     }
250                 });
251 
252             }
253         }).subscribe(new Subscriber<String>() {
254 
255             @Override
256             public void onCompleted() {
257                 latch.countDown();
258             }
259 
260             @Override
261             public void onError(Throwable e) {
262                 e.printStackTrace();
263                 latch.countDown();
264             }
265 
266             @Override
267             public void onNext(String outputMessage) {
268                 System.out.println(outputMessage);
269                 eventCounter.incrementAndGet();
270             }
271         });
272 
273         latch.await(5000, TimeUnit.MILLISECONDS);
274         assertEquals(1, subscribeCounter.get());
275         assertEquals(groupCount, groupCounter.get());
276         assertEquals(count, eventCounter.get());
277 
278     }
279 
280     /*
281      * We will only take 1 group with 20 events from it and then unsubscribe.
282      */
283     @Test
284     public void testUnsubscribeOnNestedTakeAndSyncInfiniteStream() throws InterruptedException {
285         final AtomicInteger subscribeCounter = new AtomicInteger();
286         final AtomicInteger sentEventCounter = new AtomicInteger();
287         doTestUnsubscribeOnNestedTakeAndAsyncInfiniteStream(SYNC_INFINITE_OBSERVABLE_OF_EVENT(2, subscribeCounter, sentEventCounter), subscribeCounter);
288         Thread.sleep(500);
289         assertEquals(39, sentEventCounter.get());
290     }
291 
292     /*
293      * We will only take 1 group with 20 events from it and then unsubscribe.
294      */
295     @Test
296     public void testUnsubscribeOnNestedTakeAndAsyncInfiniteStream() throws InterruptedException {
297         final AtomicInteger subscribeCounter = new AtomicInteger();
298         final AtomicInteger sentEventCounter = new AtomicInteger();
299         doTestUnsubscribeOnNestedTakeAndAsyncInfiniteStream(ASYNC_INFINITE_OBSERVABLE_OF_EVENT(2, subscribeCounter, sentEventCounter), subscribeCounter);
300         Thread.sleep(500);
301         assertEquals(39, sentEventCounter.get());
302     }
303 
304     private void doTestUnsubscribeOnNestedTakeAndAsyncInfiniteStream(Observable<Event> es, AtomicInteger subscribeCounter) throws InterruptedException {
305         final AtomicInteger eventCounter = new AtomicInteger();
306         final AtomicInteger groupCounter = new AtomicInteger();
307         final CountDownLatch latch = new CountDownLatch(1);
308 
309         es.groupBy(new Func1<Event, Integer>() {
310 
311             @Override
312             public Integer call(Event e) {
313                 return e.source;
314             }
315         })
316                 .take(1) // we want only the first group
317                 .flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
318 
319                     @Override
320                     public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
321                         System.out.println("testUnsubscribe => GroupedObservable Key: " + eventGroupedObservable.getKey());
322                         groupCounter.incrementAndGet();
323 
324                         return eventGroupedObservable
325                                 .take(20) // limit to only 20 events on this group
326                                 .map(new Func1<Event, String>() {
327 
328                                     @Override
329                                     public String call(Event event) {
330                                         return "testUnsubscribe => Source: " + event.source + "  Message: " + event.message;
331                                     }
332                                 });
333 
334                     }
335                 }).subscribe(new Subscriber<String>() {
336 
337                     @Override
338                     public void onCompleted() {
339                         latch.countDown();
340                     }
341 
342                     @Override
343                     public void onError(Throwable e) {
344                         e.printStackTrace();
345                         latch.countDown();
346                     }
347 
348                     @Override
349                     public void onNext(String outputMessage) {
350                         System.out.println(outputMessage);
351                         eventCounter.incrementAndGet();
352                     }
353                 });
354 
355         if (!latch.await(2000, TimeUnit.MILLISECONDS)) {
356             fail("timed out so likely did not unsubscribe correctly");
357         }
358         assertEquals(1, subscribeCounter.get());
359         assertEquals(1, groupCounter.get());
360         assertEquals(20, eventCounter.get());
361         // sentEvents will go until 'eventCounter' hits 20 and then unsubscribes
362         // which means it will also send (but ignore) the 19/20 events for the other group
363         // It will not however send all 100 events.
364     }
365 
366     @Test
367     public void testUnsubscribeViaTakeOnGroupThenMergeAndTake() {
368         final AtomicInteger subscribeCounter = new AtomicInteger();
369         final AtomicInteger sentEventCounter = new AtomicInteger();
370         final AtomicInteger eventCounter = new AtomicInteger();
371 
372         SYNC_INFINITE_OBSERVABLE_OF_EVENT(4, subscribeCounter, sentEventCounter)
373                 .groupBy(new Func1<Event, Integer>() {
374 
375                     @Override
376                     public Integer call(Event e) {
377                         return e.source;
378                     }
379                 })
380                 // take 2 of the 4 groups
381                 .take(2)
382                 .flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
383 
384                     @Override
385                     public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
386                         return eventGroupedObservable
387                                 .map(new Func1<Event, String>() {
388 
389                                     @Override
390                                     public String call(Event event) {
391                                         return "testUnsubscribe => Source: " + event.source + "  Message: " + event.message;
392                                     }
393                                 });
394 
395                     }
396                 })
397                 .take(30).subscribe(new Action1<String>() {
398 
399                     @Override
400                     public void call(String s) {
401                         eventCounter.incrementAndGet();
402                         System.out.println("=> " + s);
403                     }
404 
405                 });
406 
407         assertEquals(30, eventCounter.get());
408         // we should send 28 additional events that are filtered out as they are in the groups we skip
409         assertEquals(58, sentEventCounter.get());
410     }
411 
412     @Test
413     public void testUnsubscribeViaTakeOnGroupThenTakeOnInner() {
414         final AtomicInteger subscribeCounter = new AtomicInteger();
415         final AtomicInteger sentEventCounter = new AtomicInteger();
416         final AtomicInteger eventCounter = new AtomicInteger();
417 
418         SYNC_INFINITE_OBSERVABLE_OF_EVENT(4, subscribeCounter, sentEventCounter)
419                 .groupBy(new Func1<Event, Integer>() {
420 
421                     @Override
422                     public Integer call(Event e) {
423                         return e.source;
424                     }
425                 })
426                 // take 2 of the 4 groups
427                 .take(2)
428                 .flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
429 
430                     @Override
431                     public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
432                         int numToTake = 0;
433                         if (eventGroupedObservable.getKey() == 1) {
434                             numToTake = 10;
435                         } else if (eventGroupedObservable.getKey() == 2) {
436                             numToTake = 5;
437                         }
438                         return eventGroupedObservable
439                                 .take(numToTake)
440                                 .map(new Func1<Event, String>() {
441 
442                                     @Override
443                                     public String call(Event event) {
444                                         return "testUnsubscribe => Source: " + event.source + "  Message: " + event.message;
445                                     }
446                                 });
447 
448                     }
449                 })
450                 .subscribe(new Action1<String>() {
451 
452                     @Override
453                     public void call(String s) {
454                         eventCounter.incrementAndGet();
455                         System.out.println("=> " + s);
456                     }
457 
458                 });
459 
460         assertEquals(15, eventCounter.get());
461         // we should send 22 additional events that are filtered out as they are skipped while taking the 15 we want
462         assertEquals(37, sentEventCounter.get());
463     }
464 
465     @Test
466     public void testStaggeredCompletion() throws InterruptedException {
467         final AtomicInteger eventCounter = new AtomicInteger();
468         final CountDownLatch latch = new CountDownLatch(1);
469         Observable.range(0, 100)
470                 .groupBy(new Func1<Integer, Integer>() {
471 
472                     @Override
473                     public Integer call(Integer i) {
474                         return i % 2;
475                     }
476                 })
477                 .flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
478 
479                     @Override
480                     public Observable<Integer> call(GroupedObservable<Integer, Integer> group) {
481                         if (group.getKey() == 0) {
482                             return group.delay(100, TimeUnit.MILLISECONDS).map(new Func1<Integer, Integer>() {
483                                 @Override
484                                 public Integer call(Integer t) {
485                                     return t * 10;
486                                 }
487 
488                             });
489                         } else {
490                             return group;
491                         }
492                     }
493                 })
494                 .subscribe(new Subscriber<Integer>() {
495 
496                     @Override
497                     public void onCompleted() {
498                         System.out.println("=> onCompleted");
499                         latch.countDown();
500                     }
501 
502                     @Override
503                     public void onError(Throwable e) {
504                         e.printStackTrace();
505                         latch.countDown();
506                     }
507 
508                     @Override
509                     public void onNext(Integer s) {
510                         eventCounter.incrementAndGet();
511                         System.out.println("=> " + s);
512                     }
513                 });
514 
515         if (!latch.await(3000, TimeUnit.MILLISECONDS)) {
516             fail("timed out");
517         }
518 
519         assertEquals(100, eventCounter.get());
520     }
521 
522     @Test(timeout = 1000)
523     public void testCompletionIfInnerNotSubscribed() throws InterruptedException {
524         final CountDownLatch latch = new CountDownLatch(1);
525         final AtomicInteger eventCounter = new AtomicInteger();
526         Observable.range(0, 100)
527                 .groupBy(new Func1<Integer, Integer>() {
528 
529                     @Override
530                     public Integer call(Integer i) {
531                         return i % 2;
532                     }
533                 })
534                 .subscribe(new Subscriber<GroupedObservable<Integer, Integer>>() {
535 
536                     @Override
537                     public void onCompleted() {
538                         latch.countDown();
539                     }
540 
541                     @Override
542                     public void onError(Throwable e) {
543                         e.printStackTrace();
544                         latch.countDown();
545                     }
546 
547                     @Override
548                     public void onNext(GroupedObservable<Integer, Integer> s) {
549                         eventCounter.incrementAndGet();
550                         System.out.println("=> " + s);
551                     }
552                 });
553         if (!latch.await(500, TimeUnit.MILLISECONDS)) {
554             fail("timed out - never got completion");
555         }
556         assertEquals(2, eventCounter.get());
557     }
558 
559     @Test
560     public void testIgnoringGroups() {
561         final AtomicInteger subscribeCounter = new AtomicInteger();
562         final AtomicInteger sentEventCounter = new AtomicInteger();
563         final AtomicInteger eventCounter = new AtomicInteger();
564 
565         SYNC_INFINITE_OBSERVABLE_OF_EVENT(4, subscribeCounter, sentEventCounter)
566                 .groupBy(new Func1<Event, Integer>() {
567 
568                     @Override
569                     public Integer call(Event e) {
570                         return e.source;
571                     }
572                 })
573                 .flatMap(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() {
574 
575                     @Override
576                     public Observable<String> call(GroupedObservable<Integer, Event> eventGroupedObservable) {
577                         Observable<Event> eventStream = eventGroupedObservable;
578                         if (eventGroupedObservable.getKey() >= 2) {
579                             // filter these
580                             eventStream = eventGroupedObservable.filter(new Func1<Event, Boolean>() {
581 
582                                 @Override
583                                 public Boolean call(Event t1) {
584                                     return false;
585                                 }
586 
587                             });
588                         }
589 
590                         return eventStream
591                                 .map(new Func1<Event, String>() {
592 
593                                     @Override
594                                     public String call(Event event) {
595                                         return "testUnsubscribe => Source: " + event.source + "  Message: " + event.message;
596                                     }
597                                 });
598 
599                     }
600                 })
601                 .take(30).subscribe(new Action1<String>() {
602 
603                     @Override
604                     public void call(String s) {
605                         eventCounter.incrementAndGet();
606                         System.out.println("=> " + s);
607                     }
608 
609                 });
610 
611         assertEquals(30, eventCounter.get());
612         // we should send 30 additional events that are filtered out as they are in the groups we skip
613         assertEquals(60, sentEventCounter.get());
614     }
615 
616     @Test
617     public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsAndThenComplete() throws InterruptedException {
618         final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
619         final ArrayList<String> results = new ArrayList<String>();
620         Observable.create(new OnSubscribe<Integer>() {
621 
622             @Override
623             public void call(Subscriber<? super Integer> sub) {
624                 sub.onNext(1);
625                 sub.onNext(2);
626                 sub.onNext(1);
627                 sub.onNext(2);
628                 try {
629                     first.await();
630                 } catch (InterruptedException e) {
631                     sub.onError(e);
632                     return;
633                 }
634                 sub.onNext(3);
635                 sub.onNext(3);
636                 sub.onCompleted();
637             }
638 
639         }).groupBy(new Func1<Integer, Integer>() {
640 
641             @Override
642             public Integer call(Integer t) {
643                 return t;
644             }
645 
646         }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
647 
648             @Override
649             public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
650                 if (group.getKey() < 3) {
651                     return group.map(new Func1<Integer, String>() {
652 
653                         @Override
654                         public String call(Integer t1) {
655                             return "first groups: " + t1;
656                         }
657 
658                     })
659                             // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups
660                             .take(2).doOnCompleted(new Action0() {
661 
662                                 @Override
663                                 public void call() {
664                                     first.countDown();
665                                 }
666 
667                             });
668                 } else {
669                     return group.map(new Func1<Integer, String>() {
670 
671                         @Override
672                         public String call(Integer t1) {
673                             return "last group: " + t1;
674                         }
675 
676                     });
677                 }
678             }
679 
680         }).toBlocking().forEach(new Action1<String>() {
681 
682             @Override
683             public void call(String s) {
684                 results.add(s);
685             }
686 
687         });
688 
689         System.out.println("Results: " + results);
690         assertEquals(6, results.size());
691     }
692 
693     @Test
694     public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
695         System.err.println("----------------------------------------------------------------------------------------------");
696         final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
697         final ArrayList<String> results = new ArrayList<String>();
698         Observable.create(new OnSubscribe<Integer>() {
699 
700             @Override
701             public void call(Subscriber<? super Integer> sub) {
702                 sub.onNext(1);
703                 sub.onNext(2);
704                 sub.onNext(1);
705                 sub.onNext(2);
706                 try {
707                     first.await();
708                 } catch (InterruptedException e) {
709                     sub.onError(e);
710                     return;
711                 }
712                 sub.onNext(3);
713                 sub.onNext(3);
714                 sub.onCompleted();
715             }
716 
717         }).groupBy(new Func1<Integer, Integer>() {
718 
719             @Override
720             public Integer call(Integer t) {
721                 return t;
722             }
723 
724         }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
725 
726             @Override
727             public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
728                 if (group.getKey() < 3) {
729                     return group.map(new Func1<Integer, String>() {
730 
731                         @Override
732                         public String call(Integer t1) {
733                             return "first groups: " + t1;
734                         }
735 
736                     })
737                             // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups
738                             .take(2).doOnCompleted(new Action0() {
739 
740                                 @Override
741                                 public void call() {
742                                     first.countDown();
743                                 }
744 
745                             });
746                 } else {
747                     return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
748 
749                         @Override
750                         public String call(Integer t1) {
751                             return "last group: " + t1;
752                         }
753 
754                     }).doOnEach(new Action1<Notification<? super String>>() {
755 
756                         @Override
757                         public void call(Notification<? super String> t1) {
758                             System.err.println("subscribeOn notification => " + t1);
759                         }
760 
761                     });
762                 }
763             }
764 
765         }).doOnEach(new Action1<Notification<? super String>>() {
766 
767             @Override
768             public void call(Notification<? super String> t1) {
769                 System.err.println("outer notification => " + t1);
770             }
771 
772         }).toBlocking().forEach(new Action1<String>() {
773 
774             @Override
775             public void call(String s) {
776                 results.add(s);
777             }
778 
779         });
780 
781         System.out.println("Results: " + results);
782         assertEquals(6, results.size());
783     }
784 
785     @Test
786     public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenObservesOnAndDelaysAndThenCompletes() throws InterruptedException {
787         final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
788         final ArrayList<String> results = new ArrayList<String>();
789         Observable.create(new OnSubscribe<Integer>() {
790 
791             @Override
792             public void call(Subscriber<? super Integer> sub) {
793                 sub.onNext(1);
794                 sub.onNext(2);
795                 sub.onNext(1);
796                 sub.onNext(2);
797                 try {
798                     first.await();
799                 } catch (InterruptedException e) {
800                     sub.onError(e);
801                     return;
802                 }
803                 sub.onNext(3);
804                 sub.onNext(3);
805                 sub.onCompleted();
806             }
807 
808         }).groupBy(new Func1<Integer, Integer>() {
809 
810             @Override
811             public Integer call(Integer t) {
812                 return t;
813             }
814 
815         }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
816 
817             @Override
818             public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
819                 if (group.getKey() < 3) {
820                     return group.map(new Func1<Integer, String>() {
821 
822                         @Override
823                         public String call(Integer t1) {
824                             return "first groups: " + t1;
825                         }
826 
827                     })
828                             // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups
829                             .take(2).doOnCompleted(new Action0() {
830 
831                                 @Override
832                                 public void call() {
833                                     first.countDown();
834                                 }
835 
836                             });
837                 } else {
838                     return group.observeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
839 
840                         @Override
841                         public String call(Integer t1) {
842                             return "last group: " + t1;
843                         }
844 
845                     });
846                 }
847             }
848 
849         }).toBlocking().forEach(new Action1<String>() {
850 
851             @Override
852             public void call(String s) {
853                 results.add(s);
854             }
855 
856         });
857 
858         System.out.println("Results: " + results);
859         assertEquals(6, results.size());
860     }
861 
862     @Test
863     public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
864         final ArrayList<String> results = new ArrayList<String>();
865         Observable.create(new OnSubscribe<Integer>() {
866 
867             @Override
868             public void call(Subscriber<? super Integer> sub) {
869                 sub.onNext(1);
870                 sub.onNext(2);
871                 sub.onNext(1);
872                 sub.onNext(2);
873                 sub.onCompleted();
874             }
875 
876         }).groupBy(new Func1<Integer, Integer>() {
877 
878             @Override
879             public Integer call(Integer t) {
880                 return t;
881             }
882 
883         }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
884 
885             @Override
886             public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
887                 return group.subscribeOn(Schedulers.newThread()).map(new Func1<Integer, String>() {
888 
889                     @Override
890                     public String call(Integer t1) {
891                         System.out.println("Received: " + t1 + " on group : " + group.getKey());
892                         return "first groups: " + t1;
893                     }
894 
895                 });
896             }
897 
898         }).doOnEach(new Action1<Notification<? super String>>() {
899 
900             @Override
901             public void call(Notification<? super String> t1) {
902                 System.out.println("notification => " + t1);
903             }
904 
905         }).toBlocking().forEach(new Action1<String>() {
906 
907             @Override
908             public void call(String s) {
909                 results.add(s);
910             }
911 
912         });
913 
914         System.out.println("Results: " + results);
915         assertEquals(4, results.size());
916     }
917 
918     @Test
919     public void testGroupsWithNestedObserveOn() throws InterruptedException {
920         final ArrayList<String> results = new ArrayList<String>();
921         Observable.create(new OnSubscribe<Integer>() {
922 
923             @Override
924             public void call(Subscriber<? super Integer> sub) {
925                 sub.onNext(1);
926                 sub.onNext(2);
927                 sub.onNext(1);
928                 sub.onNext(2);
929                 sub.onCompleted();
930             }
931 
932         }).groupBy(new Func1<Integer, Integer>() {
933 
934             @Override
935             public Integer call(Integer t) {
936                 return t;
937             }
938 
939         }).flatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<String>>() {
940 
941             @Override
942             public Observable<String> call(final GroupedObservable<Integer, Integer> group) {
943                 return group.observeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
944 
945                     @Override
946                     public String call(Integer t1) {
947                         return "first groups: " + t1;
948                     }
949 
950                 });
951             }
952 
953         }).toBlocking().forEach(new Action1<String>() {
954 
955             @Override
956             public void call(String s) {
957                 results.add(s);
958             }
959 
960         });
961 
962         System.out.println("Results: " + results);
963         assertEquals(4, results.size());
964     }
965 
966     private static class Event {
967         int source;
968         String message;
969 
970         @Override
971         public String toString() {
972             return "Event => source: " + source + " message: " + message;
973         }
974     }
975 
976     Observable<Event> ASYNC_INFINITE_OBSERVABLE_OF_EVENT(final int numGroups, final AtomicInteger subscribeCounter, final AtomicInteger sentEventCounter) {
977         return SYNC_INFINITE_OBSERVABLE_OF_EVENT(numGroups, subscribeCounter, sentEventCounter).subscribeOn(Schedulers.newThread());
978     };
979 
980     Observable<Event> SYNC_INFINITE_OBSERVABLE_OF_EVENT(final int numGroups, final AtomicInteger subscribeCounter, final AtomicInteger sentEventCounter) {
981         return Observable.create(new OnSubscribe<Event>() {
982 
983             @Override
984             public void call(final Subscriber<? super Event> op) {
985                 subscribeCounter.incrementAndGet();
986                 int i = 0;
987                 while (!op.isUnsubscribed()) {
988                     i++;
989                     Event e = new Event();
990                     e.source = i % numGroups;
991                     e.message = "Event-" + i;
992                     op.onNext(e);
993                     sentEventCounter.incrementAndGet();
994                 }
995                 op.onCompleted();
996             }
997 
998         });
999     };
1000 
1001     @Test
1002     public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException {
1003 
1004         // choose an asynchronous source
1005         Observable<Long> source = Observable.interval(10, TimeUnit.MILLISECONDS).take(1);
1006 
1007         // apply groupBy to the source
1008         Observable<GroupedObservable<Boolean, Long>> stream = source.groupBy(IS_EVEN);
1009 
1010         // create two observers
1011         @SuppressWarnings("unchecked")
1012         Observer<GroupedObservable<Boolean, Long>> o1 = mock(Observer.class);
1013         @SuppressWarnings("unchecked")
1014         Observer<GroupedObservable<Boolean, Long>> o2 = mock(Observer.class);
1015 
1016         // subscribe with the observers
1017         stream.subscribe(o1);
1018         stream.subscribe(o2);
1019 
1020         // check that subscriptions were successful
1021         verify(o1, never()).onError(Matchers.<Throwable> any());
1022         verify(o2, never()).onError(Matchers.<Throwable> any());
1023     }
1024 
1025     private static Func1<Long, Boolean> IS_EVEN = new Func1<Long, Boolean>() {
1026 
1027         @Override
1028         public Boolean call(Long n) {
1029             return n % 2 == 0;
1030         }
1031     };
1032 
1033     private static Func1<Integer, Boolean> IS_EVEN2 = new Func1<Integer, Boolean>() {
1034 
1035         @Override
1036         public Boolean call(Integer n) {
1037             return n % 2 == 0;
1038         }
1039     };
1040 
1041     @Test
1042     public void testGroupByBackpressure() throws InterruptedException {
1043 
1044         TestSubscriber<String> ts = new TestSubscriber<String>();
1045 
1046         Observable.range(1, 4000)
1047                 .groupBy(IS_EVEN2)
1048                 .flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<String>>() {
1049 
1050                     @Override
1051                     public Observable<String> call(final GroupedObservable<Boolean, Integer> g) {
1052                         return g.observeOn(Schedulers.computation()).map(new Func1<Integer, String>() {
1053 
1054                             @Override
1055                             public String call(Integer l) {
1056                                 if (g.getKey()) {
1057                                     try {
1058                                         Thread.sleep(1);
1059                                     } catch (InterruptedException e) {
1060                                     }
1061                                     return l + " is even.";
1062                                 } else {
1063                                     return l + " is odd.";
1064                                 }
1065                             }
1066 
1067                         });
1068                     }
1069 
1070                 }).subscribe(ts);
1071         ts.awaitTerminalEvent();
1072         ts.assertNoErrors();
1073     }
1074 
1075     <T, R> Func1<T, R> just(final R value) {
1076         return new Func1<T, R>() {
1077             @Override
1078             public R call(T t1) {
1079                 return value;
1080             }
1081         };
1082     }
1083 
1084     <T> Func1<Integer, T> fail(T dummy) {
1085         return new Func1<Integer, T>() {
1086             @Override
1087             public T call(Integer t1) {
1088                 throw new RuntimeException("Forced failure");
1089             }
1090         };
1091     }
1092 
1093     <T, R> Func1<T, R> fail2(R dummy2) {
1094         return new Func1<T, R>() {
1095             @Override
1096             public R call(T t1) {
1097                 throw new RuntimeException("Forced failure");
1098             }
1099         };
1100     }
1101 
1102     Func1<Integer, Integer> dbl = new Func1<Integer, Integer>() {
1103         @Override
1104         public Integer call(Integer t1) {
1105             return t1 * 2;
1106         }
1107     };
1108     Func1<Integer, Integer> identity = UtilityFunctions.identity();
1109 
1110     @Before
1111     public void before() {
1112         MockitoAnnotations.initMocks(this);
1113     }
1114 
1115     @Test
1116     public void normalBehavior() {
1117         Observable<String> source = Observable.from(Arrays.asList(
1118                 "  foo",
1119                 " FoO ",
1120                 "baR  ",
1121                 "foO ",
1122                 " Baz   ",
1123                 "  qux ",
1124                 "   bar",
1125                 " BAR  ",
1126                 "FOO ",
1127                 "baz  ",
1128                 " bAZ ",
1129                 "    fOo    "
1130                 ));
1131 
1132         /**
1133          * foo FoO foO FOO fOo
1134          * baR bar BAR
1135          * Baz baz bAZ
1136          * qux
1137          * 
1138          */
1139         Func1<String, String> keysel = new Func1<String, String>() {
1140             @Override
1141             public String call(String t1) {
1142                 return t1.trim().toLowerCase();
1143             }
1144         };
1145         Func1<String, String> valuesel = new Func1<String, String>() {
1146             @Override
1147             public String call(String t1) {
1148                 return t1 + t1;
1149             }
1150         };
1151 
1152         Observable<String> m = source.groupBy(
1153                 keysel, valuesel).flatMap(new Func1<GroupedObservable<String, String>, Observable<String>>() {
1154 
1155             @Override
1156             public Observable<String> call(final GroupedObservable<String, String> g) {
1157                 System.out.println("-----------> NEXT: " + g.getKey());
1158                 return g.take(2).map(new Func1<String, String>() {
1159 
1160                     int count = 0;
1161 
1162                     @Override
1163                     public String call(String v) {
1164                         return g.getKey() + "-" + count++;
1165                     }
1166 
1167                 });
1168             }
1169 
1170         });
1171 
1172         TestSubscriber<String> ts = new TestSubscriber<String>();
1173         m.subscribe(ts);
1174         ts.awaitTerminalEvent();
1175         System.out.println("ts .get " + ts.getOnNextEvents());
1176         ts.assertNoErrors();
1177         assertEquals(ts.getOnNextEvents(),
1178                 Arrays.asList("foo-0", "foo-1", "bar-0", "foo-0", "baz-0", "qux-0", "bar-1", "bar-0", "foo-1", "baz-1", "baz-0", "foo-0"));
1179 
1180     }
1181 
1182     @Test
1183     public void keySelectorThrows() {
1184         Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
1185 
1186         Observable<Integer> m = source.groupBy(fail(0), dbl).flatMap(FLATTEN_INTEGER);
1187 
1188         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1189         m.subscribe(ts);
1190         ts.awaitTerminalEvent();
1191         assertEquals(1, ts.getOnErrorEvents().size());
1192         assertEquals(0, ts.getOnNextEvents().size());
1193     }
1194 
1195     @Test
1196     public void valueSelectorThrows() {
1197         Observable<Integer> source = Observable.just(0, 1, 2, 3, 4, 5, 6);
1198 
1199         Observable<Integer> m = source.groupBy(identity, fail(0)).flatMap(FLATTEN_INTEGER);
1200         TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1201         m.subscribe(ts);
1202         ts.awaitTerminalEvent();
1203         assertEquals(1, ts.getOnErrorEvents().size());
1204         assertEquals(0, ts.getOnNextEvents().size());
1205 
1206     }
1207 
1208     @Test
1209     public void innerEscapeCompleted() {
1210         Observable<Integer> source = Observable.just(0);
1211 
1212         Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
1213 
1214         TestSubscriber<Object> ts = new TestSubscriber<Object>();
1215         m.subscribe(ts);
1216         ts.awaitTerminalEvent();
1217         ts.assertNoErrors();
1218         System.out.println(ts.getOnNextEvents());
1219     }
1220 
1221     /**
1222      * Assert we get an IllegalStateException if trying to subscribe to an inner GroupedObservable more than once
1223      */
1224     @Test
1225     public void testExceptionIfSubscribeToChildMoreThanOnce() {
1226         Observable<Integer> source = Observable.just(0);
1227 
1228         final AtomicReference<GroupedObservable<Integer, Integer>> inner = new AtomicReference<GroupedObservable<Integer, Integer>>();
1229 
1230         Observable<GroupedObservable<Integer, Integer>> m = source.groupBy(identity, dbl);
1231 
1232         m.subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
1233             @Override
1234             public void call(GroupedObservable<Integer, Integer> t1) {
1235                 inner.set(t1);
1236             }
1237         });
1238 
1239         inner.get().subscribe();
1240 
1241         @SuppressWarnings("unchecked")
1242         Observer<Integer> o2 = mock(Observer.class);
1243 
1244         inner.get().subscribe(o2);
1245 
1246         verify(o2, never()).onCompleted();
1247         verify(o2, never()).onNext(anyInt());
1248         verify(o2).onError(any(IllegalStateException.class));
1249     }
1250 
1251     @Test
1252     public void testError2() {
1253         Observable<Integer> source = Observable.concat(Observable.just(0),
1254                 Observable.<Integer> error(new TestException("Forced failure")));
1255 
1256         Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
1257 
1258         TestSubscriber<Object> ts = new TestSubscriber<Object>();
1259         m.subscribe(ts);
1260         ts.awaitTerminalEvent();
1261         assertEquals(1, ts.getOnErrorEvents().size());
1262         assertEquals(1, ts.getOnNextEvents().size());
1263     }
1264 
1265     @Test
1266     public void testgroupByBackpressure() throws InterruptedException {
1267         TestSubscriber<String> ts = new TestSubscriber<String>();
1268 
1269         Observable.range(1, 4000).groupBy(IS_EVEN2).flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<String>>() {
1270 
1271             @Override
1272             public Observable<String> call(final GroupedObservable<Boolean, Integer> g) {
1273                 return g.doOnCompleted(new Action0() {
1274 
1275                     @Override
1276                     public void call() {
1277                         System.out.println("//////////////////// COMPLETED-A");
1278                     }
1279 
1280                 }).observeOn(Schedulers.computation()).map(new Func1<Integer, String>() {
1281 
1282                     int c = 0;
1283 
1284                     @Override
1285                     public String call(Integer l) {
1286                         if (g.getKey()) {
1287                             if (c++ < 400) {
1288                                 try {
1289                                     Thread.sleep(1);
1290                                 } catch (InterruptedException e) {
1291                                 }
1292                             }
1293                             return l + " is even.";
1294                         } else {
1295                             return l + " is odd.";
1296                         }
1297                     }
1298 
1299                 }).doOnCompleted(new Action0() {
1300 
1301                     @Override
1302                     public void call() {
1303                         System.out.println("//////////////////// COMPLETED-B");
1304                     }
1305 
1306                 });
1307             }
1308 
1309         }).doOnEach(new Action1<Notification<? super String>>() {
1310 
1311             @Override
1312             public void call(Notification<? super String> t1) {
1313                 System.out.println("NEXT: " + t1);
1314             }
1315 
1316         }).subscribe(ts);
1317         ts.awaitTerminalEvent();
1318         ts.assertNoErrors();
1319     }
1320 
1321     @Test
1322     public void testgroupByBackpressure2() throws InterruptedException {
1323 
1324         TestSubscriber<String> ts = new TestSubscriber<String>();
1325 
1326         Observable.range(1, 4000).groupBy(IS_EVEN2).flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<String>>() {
1327 
1328             @Override
1329             public Observable<String> call(final GroupedObservable<Boolean, Integer> g) {
1330                 return g.take(2).observeOn(Schedulers.computation()).map(new Func1<Integer, String>() {
1331 
1332                     @Override
1333                     public String call(Integer l) {
1334                         if (g.getKey()) {
1335                             try {
1336                                 Thread.sleep(1);
1337                             } catch (InterruptedException e) {
1338                             }
1339                             return l + " is even.";
1340                         } else {
1341                             return l + " is odd.";
1342                         }
1343                     }
1344 
1345                 });
1346             }
1347 
1348         }).subscribe(ts);
1349         ts.awaitTerminalEvent();
1350         ts.assertNoErrors();
1351     }
1352 
1353     static Func1<GroupedObservable<Integer, Integer>, Observable<Integer>> FLATTEN_INTEGER = new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
1354 
1355         @Override
1356         public Observable<Integer> call(GroupedObservable<Integer, Integer> t) {
1357             return t;
1358         }
1359 
1360     };
1361 
1362     @Test
1363     public void testGroupByWithNullKey() {
1364         final String[] key = new String[]{"uninitialized"};
1365         final List<String> values = new ArrayList<String>();
1366         Observable.just("a", "b", "c").groupBy(new Func1<String, String>() {
1367 
1368             @Override
1369             public String call(String value) {
1370                 return null;
1371             }
1372         }).subscribe(new Action1<GroupedObservable<String, String>>() {
1373 
1374             @Override
1375             public void call(GroupedObservable<String, String> groupedObservable) {
1376                 key[0] = groupedObservable.getKey();
1377                 groupedObservable.subscribe(new Action1<String>() {
1378 
1379                     @Override
1380                     public void call(String s) {
1381                         values.add(s);
1382                     }
1383                 });
1384             }
1385         });
1386         assertEquals(null, key[0]);
1387         assertEquals(Arrays.asList("a", "b", "c"), values);
1388     }
1389 
1390     @Test
1391     public void testGroupByUnsubscribe() {
1392         final Subscription s = mock(Subscription.class);
1393         Observable<Integer> o = Observable.create(
1394                 new OnSubscribe<Integer>() {
1395                     @Override
1396                     public void call(Subscriber<? super Integer> subscriber) {
1397                         subscriber.add(s);
1398                     }
1399                 }
1400         );
1401         o.groupBy(new Func1<Integer, Integer>() {
1402 
1403             @Override
1404             public Integer call(Integer integer) {
1405                 return null;
1406             }
1407         }).subscribe().unsubscribe();
1408         verify(s).unsubscribe();
1409     }
1410 
1411     @Test
1412     public void testGroupByShouldPropagateError() {
1413         final Throwable e = new RuntimeException("Oops");
1414         final TestSubscriber<Integer> inner1 = new TestSubscriber<Integer>();
1415         final TestSubscriber<Integer> inner2 = new TestSubscriber<Integer>();
1416 
1417         final TestSubscriber<GroupedObservable<Integer, Integer>> outer
1418                 = new TestSubscriber<GroupedObservable<Integer, Integer>>(new Subscriber<GroupedObservable<Integer, Integer>>() {
1419 
1420             @Override
1421             public void onCompleted() {
1422             }
1423 
1424             @Override
1425             public void onError(Throwable e) {
1426             }
1427 
1428             @Override
1429             public void onNext(GroupedObservable<Integer, Integer> o) {
1430                 if (o.getKey() == 0) {
1431                     o.subscribe(inner1);
1432                 } else {
1433                     o.subscribe(inner2);
1434                 }
1435             }
1436         });
1437         Observable.create(
1438                 new OnSubscribe<Integer>() {
1439                     @Override
1440                     public void call(Subscriber<? super Integer> subscriber) {
1441                         subscriber.onNext(0);
1442                         subscriber.onNext(1);
1443                         subscriber.onError(e);
1444                     }
1445                 }
1446         ).groupBy(new Func1<Integer, Integer>() {
1447 
1448             @Override
1449             public Integer call(Integer i) {
1450                 return i % 2;
1451             }
1452         }).subscribe(outer);
1453         assertEquals(Arrays.asList(e), outer.getOnErrorEvents());
1454         assertEquals(Arrays.asList(e), inner1.getOnErrorEvents());
1455         assertEquals(Arrays.asList(e), inner2.getOnErrorEvents());
1456     }
1457 }